/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.flink.table.planner.plan.nodes.exec.stream;

import org.apache.gluten.velox.VeloxSourceBuilder;

import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecTableSourceScan;
import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
import org.apache.flink.table.planner.utils.ShortcutUtils;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Collections;

/**
 * Stream {@link ExecNode} to read data from an external source defined by a {@link
 * ScanTableSource}.
 */
@ExecNodeMetadata(
    name = "stream-exec-table-source-scan",
    version = 1,
    producedTransformations = CommonExecTableSourceScan.SOURCE_TRANSFORMATION,
    minPlanVersion = FlinkVersion.v1_15,
    minStateVersion = FlinkVersion.v1_15)
public class StreamExecTableSourceScan extends CommonExecTableSourceScan
    implements StreamExecNode<RowData> {

  public StreamExecTableSourceScan(
      ReadableConfig tableConfig,
      DynamicTableSourceSpec tableSourceSpec,
      RowType outputType,
      String description) {
    this(
        ExecNodeContext.newNodeId(),
        ExecNodeContext.newContext(StreamExecTableSourceScan.class),
        ExecNodeContext.newPersistedConfig(StreamExecTableSourceScan.class, tableConfig),
        tableSourceSpec,
        outputType,
        description);
  }

  @JsonCreator
  public StreamExecTableSourceScan(
      @JsonProperty(FIELD_NAME_ID) int id,
      @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
      @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
      @JsonProperty(FIELD_NAME_SCAN_TABLE_SOURCE) DynamicTableSourceSpec tableSourceSpec,
      @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
      @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
    super(
        id,
        context,
        persistedConfig,
        tableSourceSpec,
        Collections.emptyList(),
        outputType,
        description);
  }

  @Override
  public Transformation<RowData> createInputFormatTransformation(
      StreamExecutionEnvironment env,
      InputFormat<RowData, ?> inputFormat,
      InternalTypeInfo<RowData> outputTypeInfo,
      String operatorName) {
    // It's better to use StreamExecutionEnvironment.createInput()
    // rather than addLegacySource() for streaming, because it take care of checkpoint.
    return env.createInput(inputFormat, outputTypeInfo).name(operatorName).getTransformation();
  }

  @Override
  protected Transformation<RowData> translateToPlanInternal(
      PlannerBase planner, ExecNodeConfig config) {
    // --- Begin Gluten-specific code changes ---
    final ScanTableSource tableSource =
        getTableSourceSpec()
            .getScanTableSource(
                planner.getFlinkContext(), ShortcutUtils.unwrapTypeFactory(planner));
    Transformation<RowData> sourceTransformation = super.translateToPlanInternal(planner, config);
    return VeloxSourceBuilder.build(sourceTransformation, tableSource);
    // --- End Gluten-specific code changes ---
  }
}
